package cn.doitedu.test;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Flink的度量API
 * 统计处理数据的条数、处理数据的时间、每秒处理的数据的条数
 */

/**
 *
 *  可视化报表工具：grafana，只要配置地址即可
 *
 * 获取flink统计度量的 resi api 地址
 * # GET /v1/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
 * http://localhost:8081/v1/jobs/1bd848cfcfbc02243b7091dc71716f71/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics/
 * http://localhost:8081/v1/jobs/1bd848cfcfbc02243b7091dc71716f71/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics?get=Process.g1.eleCount
 */

public class MetricDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);
        //1
        //2
        //a
        //d
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        lines.process(new ProcessFunction<String, Integer>() {

            Counter eleCount;
            Counter numCount;
            Counter errorCount;
            MeterView meterView;


            @Override
            public void open(Configuration parameters) throws Exception {
                MetricGroup metricGroup1 = getRuntimeContext().getMetricGroup().addGroup("g1");
                MetricGroup metricGroup2 = getRuntimeContext().getMetricGroup().addGroup("g2");

                eleCount = metricGroup1.counter("eleCount");
                numCount = metricGroup1.counter("numCount");
                errorCount = metricGroup1.counter("errorCount");

                meterView = metricGroup2.meter("process-rate", new MeterView(2));


            }

            @Override
            public void processElement(String value, Context ctx, Collector<Integer> out) throws Exception {

                //统计每个task处理的数据
                eleCount.inc(); //计数器+1
                try {
                    int num = Integer.parseInt(value);
                    //格式两良好的数据
                    numCount.inc();
                    meterView.markEvent();
                    out.collect(num);
                } catch (NumberFormatException e) {
                    //有问题的数据
                    errorCount.inc();
                }

            }
        }).print();

        env.execute();


    }

}
